Java 线程池 - ThreadPoolExecutor

前言

线程池是多并发编程中经常用到,了解是线程池的使用和原理是java程序员的必修课。编写多线程的程序推荐使用线程池而不是自己创建线程,因为线程池中的线程可以复用,复用线程可以降低线程创建和销毁的资源消耗,线程池帮助管理、调度、监控线程,可以防止无限制的创建线程,消耗完系统资源

简单使用

几种常用的线程池:

  • newCachedThreadPool,可缓存的线程池,线程池容量几乎为无限(Interger. MAX_VALUE),当一个新线程任务提交,如果线程池没有空闲的线程,则创建一个新线程执行,否则使用空闲线程。
  • newFixedThreadPool,定长的线程池,线程池维持定长的线程,即使没有任务运行也不会关闭线程,当新任务提交,线程池中线程都在执行,则将任务放入阻塞队列中排队等待空闲线程运行。
  • newScheduledThreadPool,定长的线程池,支持定时及周期性执行任务,newFixedThreadPool的特殊化,线程池容量为1
  • newSingleThreadExecutor,线程池只有一个线程,所有任务按照提交的先后顺序执行

使用例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
public class TreadPoolTest {
public static void executeThread(ExecutorService es){
for (int i = 0; i < 10;i++){
es.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(2000);
System.out.println(Thread.currentThread().getName());
}catch (InterruptedException e){

}
}
});
}
}


public static void cachedThreadPoolTest() {
ExecutorService es = Executors.newCachedThreadPool();
executeThread(es);
es.shutdown();
}

public static void fixedThreadPoolTest(){
ExecutorService es = Executors.newFixedThreadPool(3);
executeThread(es);
es.shutdown();
}

public static void singleThreadPoolTest(){
ExecutorService es = Executors.newSingleThreadExecutor();
executeThread(es);
es.shutdown();
}

public static void scheduleThreadPoolTest1(){
ScheduledExecutorService es = Executors.newScheduledThreadPool(5);
for (int i = 0;i < 10;i++){
es.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
}
}, 1, 3, TimeUnit.SECONDS);
}
es.shutdown();
}

public static void scheduleThreadPoolTest2(){
ScheduledExecutorService es = Executors.newScheduledThreadPool(1);
es.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
}
}, 1, 3, TimeUnit.SECONDS);
}


public static void main(String[] args) {
// cachedThreadPoolTest();
// fixedThreadPoolTest();
// singleThreadPoolTest();
// scheduleThreadPoolTest1();
// scheduleThreadPoolTest2();
}

}

线程池实现

线程池类的关系图:

线程池类的关系图

Executor接口:定义了运行新任务的execute接口

ExecutorService接口:继承了Executor接口,增加添加了一些用来管理执行器生命周期和任务生命周期的方法、支持Future任务的方法

AbstractExecutorService抽象类:实现了一些ExecutorService的通用接口

ThreadPoolExecutor类:实现了线程池的完整的方法

Scheduled相关的类:有关定期执行任务

Executors:是个工厂类,内部实际上是根据不同的线程池选择不同的参数生产ThreadPoolExecutor对象

因此线程池的实现方法主要在ThreadPoolExecutor,接下来会重点分析ThreadPoolExecutor来了解java线程池的实现

Executors

当我们使用线程池通常是通过Executors的静态方法得到上述的几种线程池,其实Executors是个工厂类,内部是返回ThreadPoolExecutor的实例化对象,ScheduledThreadPoolExecutor提供定时启动线程执行任务的线程池,这里先不做介绍。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

public static ExecutorService newSingleThreadExecutor() {
// 确保单线程不被破坏
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}

可以看出是通过在实例化ThreadPoolExecutor对象时传入不同的参数得到不同的线程池,接下来我们具体看ThreadPoolExecutor类

ThreadPoolExecutor

常用变量

ctl是ThreadPoolExecutor的重要变量,记录了线程池的运行状态runState和线程池有效的线程数workerCount(这里的workerCount指的正在运行任务的线程数,不算入空闲的线程数)。

ctl是一个AtomicInteger类,其后29位记录workerCount的数量,因此总共可以记录(2^29)-1 (约5亿),第30位记录了线程池的运行状态,总共有五种状态:

  • RUNNING:接受新任务并且能够处理阻塞队列中的任务
  • SHUTDOWN:不能接受新任务但是能够继续处理阻塞队列中的任务,当线程池处于RUNNING状态时,调用shutdown()方法进入该状态,这时线程池不接受新的任务,但是任然会继续处理完池中正在运行的任务和阻塞队列中的任务
  • STOP:不能接受新任务也不处理队列中的任务,并且会中断正在运行的任务,当线程池处于RUNNING状态时,调用shutdownNow()方法进入该状态

  • TIDYING:所有的任务都结束,线程池的数量为0,由SHUTDOWN或STOP状态转化而来,线程池进入该状态会调用terminated()的钩子方法

  • TERMINATED:terminated()方法调用完毕,线程池生命周期结束

线程池状态转化图:

线程池状态转化

线程池同时维护了线程的集合(其中存放Worker对象,即线程池中线程的封装对象)和一个ReentrantLock锁,后文会看到其使用

1
2
3
private final ReentrantLock mainLock = new ReentrantLock();

private final HashSet<Worker> workers = new HashSet<Worker>();

构造函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// 线程池的重要参数,前三位表示线程池的状态,后29位表示池中的线程数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;

private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,
TimeUnit unit,BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

构造函数传入的字段主要为:

  • corePoolSize:线程池的核心线程数,线程池维持的定长线程数,如果线程池的线程数量小于线程池的核心线程数,即使线程上没有任务也不会关闭
  • maximumPoolSize:线程池中的最大的线程数量
  • keepAliveTime:当线程池中的线程大于corePoolSize,空闲线程将会等待新任务,如果等待时间超过keepAliveTime,将会结束该空闲线程
  • unit:keepAliveTime的时间单位
  • workQueue:阻塞队列,当任务超过corePoolSize,将会进入等待队列,该队列是一个阻塞队列

  • threadFactory:用来创建新线程的工厂类,用户可以传入定制的threadFactory来创建定制的线程

  • handler:线程池的饱和策略,当线程池线程数量大于等于maximumPoolSize并且等待队列满时,新的任务将会交给handler处理

当新的任务提交时,先后会通过与corePoolSize,workQueue,maximumPoolSize,handler判断任务的去向

  • 如果线程池的线程数小于corePoolSize,则会创建新线程运行任务,即使线程池中还有空闲线程。

  • 如果线程池的线程数大于等于corePoolSize

    • 如果等待队列中还没满,则将其放入等待队列中
    • 如果等待队列满
      • 如果线程池的线程数小于maximumPoolSize,则创建新线程运行任务
      • 如果线程池的线程数大于等于maximumPoolSize,则交给handler来处理

线程池的线程如果执行完任务处于空闲状态时:

  • 如果线程池的线程数小于corePoolSize,则继续保留空闲线程,等待新任务
  • 如果线程池的线程数大于等于corePoolSize,则空闲线程等待新任务,等待时间超过keepAliveTime,则会回收空闲线程

任务提交

任务通过execute方法提交 ,如上述所说的,如果线程池中线程个数小于corePoolSize,调用addWorker判断并新建一个线程执行任务,否则尝试将任务放入等待队列中,如果等待队列满了,则需要和线程池的最大容量比较,如果比其小,则新建一个线程运行,addWorker方法中的第二个参表示比较是corePoolSize还是maximumPoolSize,从下文的adddWorker方法中可以看到。如果都失败,则调用reject方法将任务交给handle处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();

int c = ctl.get();
// 如果线程池中线程数量小于corePoolSize,新建一个线程
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 如果线程池的转态为RUNNING,则将任务放入等待队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果等待队列已经满了
else if (!addWorker(command, false))
reject(command);
}

addWorker用于创建一个新的线程用来执行任务,firstTask是线程的第一个任务,core为true时表示当前线程池的线程数量小于corePoolSize,false表示数量大于corePoolSize且小于maximumPoolSize,因为不论是线程池的数量小于corePoolSize,还是等待队列满,线程池的线程数量大于corePoolSize且小于maximumPoolSize都需要创建新线程,这是两种情况下创建新线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

/**
* 当线程池的状态大于等于SHUTDOWN时,说明线程池已经不再接受新任务了
* 当一下条件任何一个不满足时,则返回false
* rs == SHUTDOWN:线程池的状态为SHUTDOWN,
* firstTask == null:新任务为空
* ! workQueue.isEmpty():等待队列不为空
* 当rs == SHUTDOWN时,此时不能接受新任务,因此如果新任务不为空,返回false,
* 如果队列为空说明队列中已经的没有新任务,此时准备转移到TIDYING状态,队列不需要添加新的任务,返回false
* 即在rs >= SHUTDOWN时,除非是线程池处于SHUTDOWN转态,且任务为空,并且等待队列不为空,否则一律不创建新线程
*/
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;

// 循环通过CAS修改workerCount
for (;;) {
int wc = workerCountOf(c);
// 根据core,比较corePoolSize或者maximumPoolSize的值,大于则返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 修改workerCount的值,加1,成功则跳出外层循环
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
// 状态未改变,需要回到第一层for循环重头开始判断
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 将任务封装成Worker对象
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 加锁,同步,防止在运行新建线程的过程线程池的状态发生了变化
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
// 如果线程池处于RUNNING状态,或者处于SHUTDOWN状态且任务为空,则加入线程到工作集中
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 将其加入到工作集合中
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 如果确认加入,则启动该线程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

addWorker方法用于新建一个线程,启动并加入到workers当中

通过addWorker方法看来,只有当线程池中线程的数量符合要求,并且线程池的状态处于RUNNING或者处于SHUTDOWN并且新任务为空的时候才会新建线程运行任务,新线程在运行之前需要加锁并判断是否符合运行条件,加锁防止线程池的状态在这之间发生了变化,最后通过t.start()启动新线程,实际上Worker是一个Runnable,t是新建Worker对象w的Thread封装,start启动的是Worker线程,运行worker的run方法,通过接下来查看Worker代码可以看到。

Worker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
private static final long serialVersionUID = 6138294804551838833L;

/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;

/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
// 传入Worker对象创建新线程
this.thread = getThreadFactory().newThread(this);
}

/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}

protected boolean isHeldExclusively() {
return getState() != 0;
}

protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}

protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}

public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }

void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}

可以看到Worker实现了Runnable,在构造函数中通过getThreadFactory().newThread(this)创建了新线程,因此在addWorker方法中启动的实际上是Worker对象的线程,线程调用start启动实际上是运行run方法中的代码,Worker中的run方法中调用了runWorker方法,在看runWorker方法之前,我们先来看Worker还继承了AQS,使用AQS实现独占锁的功能。

  • 为什要设置锁:因为Worker对象不管是在初始化还是在运行过程当中都不希望被中断,使用AQS独占锁在获取独占锁之后,线程是不会被中断,只有线程运行结束才会进行中断的判断(详见AQS独占锁实现),同时我们可以通过判断线程是否占有独占锁来判断线程是否在运行,通过isHeldExclusively方法返回false说明所已经被占有,线程正在运行。
  • 为什么不用ReentrantLock:通过tryAcquire方法可以看到,Worker的锁是不可重入的,这是因为我们不希望任务在调用像setCorePoolSize这样的线程池控制方法时重新获取锁。如果使用ReentrantLock,它是可重入的,这样如果在任务中调用了如setCorePoolSize这类线程池控制的方法,会中断正在运行的线程。

同时Worker在初始化时是不能被中断,也不能获得其锁,getState(-1),所以在runWorker方法中一开始需要unlock。

接下来让我们看看runWorker方法,了解线程池中的线程是怎么被复用的

runWorker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
// 获得第一个任务
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 如果任务的为空,则从等待队列中获取一个任务,从这可以看出,第一个任务是可以为空的,
// 如果第一个任务为空,则启动的线程会从等待队列中获取任务,但是等待队列中的任务都是不能为空的,getTask返回空说明获取任务失败:可能是线程池结束,也可能是等待任务时间超过keepAliveTime
while (task != null || (task = getTask()) != null) {
//
w.lock();
// 如果线程池的状态不为RUNNING和SHUTDOWN,则需要中断线程
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 空方法,用于用户继承,在任务开始前添加操作
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 运行任务,任务虽然是Runnable形式,但是并不是作为一个线程来运行
// 而是在Worker线程中,对一个个任务调run方法
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 空方法,用于用户继承,在任务结束后添加操作
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}

runWorker方法是将worker中的任务作为第一个任务,while循环不断从等待队列取出任务执行,在循环中的过程:

  • 首先判断线程池的状态是不是STOP或者后续转态,如果是且线程不是中断转态,则需要中断线程,if语句的写法有点看不懂。

  • 调用beforeExecute方法,执行任务运行前的操作

  • 调用任务的run方法,执行任务

  • 调用afterExecute方法,执行任务结束后的操作

ThreadPoolExecutor中的beforeExecute和afterExecute方法都为空,留给子类实现在任务执行前后的操作。最后如果等待队列没有元素或者等待任务超过了等待的最长时间,退出循环,执行processWorkerExit方法。

可以从上面看出,虽然传入的任务是Runnable类型,但是实际运行并没有将任务作为一个独立的线程运行(没有将任务封装成Thread,调用start运行),而是worker对象的线程不断从等待队列中获取任务,运行任务的run方法,线程池就是通过这样的方式复用线程:启用一个线程,调用任务的run方法,而不是将任务当成线程启动。

接下来我们来看从等待队列获取任务的方法getTask

getTask

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
/**
* 如果getTask方法返回null,则在runWorker中就会退出循环,销毁线程
*/
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?

for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// 如果线程池转态为STOP或者之后的状态,返回null
// 如果线程池的转态为SHUTDOWN,并且等待队列为空,返回null
// 返回null,则外部线程就会被销毁,因此线程池的线程数量减1
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}

int wc = workerCountOf(c);

// 获取任务是否有等待的最长时间的标志
// allowCoreThreadTimeOut默认为false,可以由allowCoreThreadTimeOut来设置
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

// 如果线程池数量过多或者获取任务时间超时,返回null
// 但是要保证线程的数量大于一个如果在等待队列不为空的情况下
// 返回null,则外部线程就会被销毁,因此线程池的线程数量减1
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}

try {
// 如果设置等待超时,则调用阻塞队列的poll,在等待时间内获取,否则获取为null
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

在getTask中线程的获取任务的最大等待时间实际上转变为阻塞队列调用poll上设置的最长等待时间。

回到runWorker的最后,我们来看如何销毁线程

processWorkerExit

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 如果线程被中断,无法对线程操作,工作线程数减1
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();

final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 统计的完成的任务数,从workers中移除线程
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}

// 每一个线程的结束都需要判断线程池是否可以结束
tryTerminate();

int c = ctl.get();
// 如果线程池处于RUNNING或者SHUTDOWN状态
if (runStateLessThan(c, STOP)) {
// 如果线程是正常结束的,进入if
if (!completedAbruptly) {
// 设置最小线程数,保证线程池处于上述转态下有线程运行任务
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 如果线程被中断或者线程正常结束后小于线程池的维持的线程数,新启动一个线程
addWorker(null, false);
}
}

如果线程是正常结束,只要从worker集合中移除就好了,线程池维持的工作线程数量如果是线程正常结束,已经getTask减去1了,如果线程是中断意外结束的,需要在processWorkerExit中减去1(if (completedAbruptly){decrementWorkerCount()})。

每一个线程结束之后都需要调用tryTerminate判断是否线程池可以结束,尝试将线程池的转态转为TERMINATED,后文的tryTerminate方法中可以看到

最后要确保线程池中线程的数量,对于中断意外退出线程的情况,需要新建一个线程

如果没有设置过allowCoreThreadTimeOut,则要保证线程池中的线程数量不小于corePoolSize,否则需要新建一个线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// 常数将线程池的转态转为TERMINATED
final void tryTerminate() {
for (;;) {
int c = ctl.get();
// 如果线程池的转态为RUNNING或者线程池已经处于结束转态
// 或者线程池处于SHUTDOWN但是等待队列中还有任务,都不需要操作,直接返回
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// 运行到这说明线程池的状态处于STOP或者处于SHUTDOWN且等待队列中没有任务了
// 如果线程池中的还有线程,需要全部中断
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}

final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 将状态转移到TIDYING
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// 调用terminated,运行完之后线程池的状态变为TERMINATED
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}

其余方法

剩下的线程池的方法不一一细讲,介绍一下方法的作用

  • shutdown:将线程池的状态转为SHUTDOWN,不接受新任务,中断空闲线程,但是会将正在执行的任务和等待队列中的任务执行完
  • shutdownNow:将线程池的状态转为SHUTDOWN,不接受新任务,并将线程池运行的线程全部中断,等待队列清空

  • getTaskCount:线程池已经执行和未执行的任务总数

  • getCompletedTaskCount:线程池已经执行完的线程数

  • getLargestPoolSize:线程池中同时存在最大线程数

  • getPoolSize:线程池当前的线程数量

  • getActiveCount:当前线程池中正在执行任务的线程数量

总结

线程池中任务提交处理和线程的处理方式:

当一个新任务通过execute提交

  1. 首先判断线程池线程数量,如果小于corePoolSize,则创建一个新线程处理,该任务作为其第一个任务
  2. 如果大于corePoolSize并且等待队列中未满,放入等待队列中
  3. 如果大于corePoolSize并且等待队列中已经满了,则需要和maximumPoolSize比较
    • 如果大于maximumPoolSize,则交给饱和策略handle处理
    • 如果小于maximumPoolSize,则创建一个新线程处理,该任务作为其第一个任务

当创建一个新的Worker对象线程时:

  1. 在新建一个Worker对象之前首先需要将workCount通过CAS加1,接着新建一个Worker对象,将其放入workers中
  2. Worker初始化将自身对象传入Thread中,创建一个新线程,调用start,启动线程
  3. 线程不断从等待队列中取出任务,调用其run运行任务
  4. 在从等待队列中取任务会发生阻塞,如果当前线程池数量大于corePoolSize或者设置allowCoreThreadTimeOut为true,当阻塞时间超过keepAliveTime,就销毁线程
  5. 每一个线程结果判断线程池是否可以结束,如果是则转移线程池的转态到TERMINATED,同时需要判断线程池的线程是否小于corePoolSize或者设置allowCoreThreadTimeOut为true下的1,如果小则addWorker

参考

深入理解Java线程池:ThreadPoolExecutor

0%